package com.xiaofan.ct.producer;

import com.xiaofan.ct.common.bean.Producer;
import com.xiaofan.ct.producer.bean.LocalFileProducer;
import com.xiaofan.ct.producer.io.LocalFileDataIn;
import com.xiaofan.ct.producer.io.LocalFileDataOut;

import java.io.IOException;

/**
 * 启动对象
 * kafka
 *
 * 查询topic: bin/kafka-topics.sh --list --zookeeper 192.168.1.23:2181,192.168.1.24:2181,192.168.1.25:2181
 * <p>
 * 创建topic： bin/kafka-topics.sh --create --zookeeper 192.168.1.23:2181,192.168.1.24:2181,192.168.1.25:2181 --replication-factor 2 --partitions 3 --topic ct
 * <p>
 * 删除topic: bin/kafka-topics.sh --delete --zookeeper 192.168.1.23:2181,192.168.1.24:2181,192.168.1.25:2181 --topic sensor
 * <p>
 * 生产数据：bin/kafka-console-producer.sh --broker-list 192.168.1.23:9091,192.168.1.24:9091,192.168.1.25:9091 --topic sensor
 * <p>
 * 消费数据： bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.23:9091,192.168.1.24:9091,192.168.1.25:9091 --from-beginning --topic ct
 *
 * flume： bin/flume-ng agent -c conf/ -n a1 -f /home/hadoop/fanjh/china-telecom/flume/flume-2-kafka.conf
 *
 * producer：java -jar jar/ct-producer.jar ./in/contact.log ./out/call.log
 */
public class Bootstrap {

    public static void main(String[] args) throws IOException {
        if ( args.length < 2 ) {
            System.out.println("系统参数不正确，请按照指定格式传递：java -jar Produce.jar path1 path2 ");
            System.exit(1);
        }

        // 构建生产者
        Producer producer = new LocalFileProducer();

        // producer.setIn(new LocalFileDataIn("D:\\big-data\\project\\project-china-telecom\\data\\contact.log"));
        // producer.setOut(new LocalFileDataOut("D:\\big-data\\project\\project-china-telecom\\data\\call.log"));

        producer.setIn(new LocalFileDataIn(args[0]));
        producer.setOut(new LocalFileDataOut(args[1]));
        // 生产数据
        producer.produce();
        // 关闭生产者
        producer.close();

    }
}
